package org.codehaus.activemq.transport.composite;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.AbstractTransportChannel;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportChannelProvider;

public class CompositeTransportChannel extends AbstractTransportChannel
{
  private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);
  protected WireFormat wireFormat;
  protected URI[] uris;
  protected TransportChannel channel;
  protected SynchronizedBoolean closed;
  protected SynchronizedBoolean started;
  protected int retryCount = 10;
  protected long failureSleepTime = 500L;
  protected URI currentURI;

  public CompositeTransportChannel(WireFormat wireFormat, URI[] uris)
  {
    this.wireFormat = wireFormat;
    this.uris = uris;
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
  }

  public String toString() {
    return "CompositeTransportChannel: " + this.channel;
  }

  public void start() throws JMSException {
    if (this.started.commit(false, true))
      establishConnection();
  }

  public void stop()
  {
    if ((this.closed.commit(false, true)) && 
      (this.channel != null))
      try {
        this.channel.stop();
      }
      catch (Exception e) {
        log.warn("Caught while closing: " + e + ". Now Closed", e);
      }
      finally {
        this.channel = null;
        super.stop();
      }
  }

  public Receipt send(Packet packet)
    throws JMSException
  {
    return getChannel().send(packet);
  }

  public Receipt send(Packet packet, int timeout) throws JMSException
  {
    return getChannel().send(packet, timeout);
  }

  public void asyncSend(Packet packet) throws JMSException
  {
    getChannel().asyncSend(packet);
  }

  public void setPacketListener(PacketListener listener) {
    super.setPacketListener(listener);
    if (this.channel != null)
      this.channel.setPacketListener(listener);
  }

  public void setExceptionListener(ExceptionListener listener)
  {
    super.setExceptionListener(listener);
    if (this.channel != null)
      this.channel.setExceptionListener(listener);
  }

  public boolean isMulticast()
  {
    return false;
  }

  protected void establishConnection()
    throws JMSException
  {
    boolean connected = false;
    long time = this.failureSleepTime;
    for (int i = 0; (!connected) && (i < this.retryCount); i++) {
      if (i > 0) {
        log.info("Sleeping for: " + time + " millis and trying again");
        try {
          Thread.sleep(time);
        }
        catch (InterruptedException e) {
          log.warn("Sleep interupted: " + e, e);
        }
        time *= 2L;
      }

      List list = new ArrayList(Arrays.asList(this.uris));
      while ((!connected) && (!list.isEmpty())) {
        URI uri = extractURI(list);
        try {
          attemptToConnect(uri);
          configureChannel();
          connected = true;
          this.currentURI = uri;
        }
        catch (JMSException e) {
          log.info("Could not connect to: " + uri + ". Reason: " + e);
        }
      }
    }

    if (!connected) {
      StringBuffer buffer = new StringBuffer("");
      for (int i = 0; i < this.uris.length; i++) {
        buffer.append(this.uris[i]);
        if (i < this.uris.length - 1) {
          buffer.append(",");
        }
      }
      JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
      throw jmsEx;
    }
  }

  protected TransportChannel getChannel() throws JMSException
  {
    if (this.channel == null) {
      throw new JMSException("No TransportChannel connection available");
    }
    return this.channel;
  }

  protected void configureChannel() {
    ExceptionListener exceptionListener = getExceptionListener();
    if (exceptionListener != null) {
      this.channel.setExceptionListener(exceptionListener);
    }
    PacketListener packetListener = getPacketListener();
    if (packetListener != null)
      this.channel.setPacketListener(packetListener);
  }

  protected URI extractURI(List list)
    throws JMSException
  {
    int idx = 0;
    if (list.size() > 1) {
      do {
        idx = (int)(Math.random() * list.size());
      }
      while ((idx < 0) || (idx >= list.size()));
    }
    return (URI)list.remove(idx);
  }

  protected void attemptToConnect(URI uri) throws JMSException {
    this.channel = TransportChannelProvider.create(this.wireFormat, uri);
    if (this.started.get())
      this.channel.start();
  }
}